package com.uxsino.reactorq.subscriber;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import javax.jms.Destination;
import javax.jms.JMSException;

import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.uxsino.reactorq.commons.IMessageWithReceiver;
import com.uxsino.reactorq.commons.JsonSerialUtil;
import com.uxsino.reactorq.commons.QueueSubscriber;
import com.uxsino.reactorq.commons.ReactorQFactory;
import com.uxsino.reactorq.commons.TopicSubscriber;
import com.uxsino.reactorq.event.Event;

public class EventSubscriber implements Subscriber<Event> {
    private static Logger logger = LoggerFactory.getLogger(EventSubscriber.class);

    private ReactorQFactory rqFactory;

    private String surfix;

    private String from;

    private ObjectMapper mapper = new ObjectMapper();

    private Map<String, TopicSubscriber<String>> subscribers = new HashMap<>();

    private Map<String, TopicSubscriber<String>> subscriberCache = new ConcurrentHashMap<>();

    private Map<String, QueueSubscriber<String>> queueSubscriberCache = new ConcurrentHashMap<>();
    
    private Map<String, QueueSubscriber<Object>> queueObjectSubscriberCache = new ConcurrentHashMap<>();

    public EventSubscriber(ReactorQFactory factory) {
        this.rqFactory = factory;
        this.surfix = "";
    }

    public EventSubscriber(ReactorQFactory factory, String surfix) {
        this.rqFactory = factory;
        this.surfix = surfix;
    }

    @Override
    public void onNext(Event event) {
        String topic = getEventTopicName(event.getClass());
        TopicSubscriber<String> subscriber = null;
        if (subscribers.containsKey(topic)) {
            subscriber = subscribers.get(topic);
            if (subscriber.checkSessionClosed()) {
                try {
                    subscriber = rqFactory.<String> createTopicSubscriber(topic, String.class);
                } catch (JMSException e) {
                    logger.error("error connecting to activemq broker {}. event message will not be sent.",
                        rqFactory.getBrokerURL());
                }
                subscribers.put(topic, subscriber);
            }

        } else {
            try {
                subscriber = rqFactory.<String> createTopicSubscriber(topic, String.class);
                subscribers.put(topic, subscriber);
            } catch (JMSException e) {
                logger.error("error connecting to activemq broker {}. event message will not be sent.",
                    rqFactory.getBrokerURL());
                return;
            }

        }

        event.setFrom(from);
        JsonNode node = mapper.valueToTree(event);
        try {
            String eventStr = mapper.writeValueAsString(node);
            subscriber.onNext(eventStr,
                event instanceof IMessageWithReceiver ? ((IMessageWithReceiver) event).getReceiverId() : null,
                (Destination) event.replyTo);
        } catch (JsonProcessingException e) {
            logger.error("error serializing event into json.", e);
        }
    }

    private String getEventTopicName(Class<? extends Event> eventClass) {
        return "simo-event-" + Event.getEventName(eventClass) + surfix;

    }

    @Override
    public void onSubscribe(Subscription s) {
        s.request(Long.MAX_VALUE - 1);

    }

    @Override
    public void onError(Throwable t) {
        logger.error("error: ", t.getMessage());
    }

    @Override
    public void onComplete() {
        // Do nothing by default

    }

    public void setFrom(String from) {
        this.from = from;
    }

    public String getFrom() {
        return from;
    }

    public TopicSubscriber<String> getCachedSubscriberOnTopic(String topic) throws JMSException {
        TopicSubscriber<String> subscriber = subscriberCache.get(topic);
        if (subscriber == null) {
            subscriber = rqFactory.<String> createTopicSubscriber(topic, String.class);
            subscriberCache.put(topic, subscriber);
        }
        // 由于activemq端异常导致session失效，需要重新建立session
        if (subscriber.checkSessionClosed()) {
            subscriber = rqFactory.<String> createTopicSubscriber(topic, String.class);
            subscriberCache.put(topic, subscriber);
        }
        return subscriber;
    }

    public void sendStringOnTopic(String topic, String msg) throws JMSException {
        TopicSubscriber<String> subscriber = getCachedSubscriberOnTopic(topic);
        subscriber.onNext(msg);
    }

    public void sendObjectOnTopic(String topic, Object object) throws JMSException {
        TopicSubscriber<String> subscriber = getCachedSubscriberOnTopic(topic);

        subscriber.onNext(JsonSerialUtil.toJson(object));
    }

    public QueueSubscriber<String> getCachedSubscriberOnQueue(String queue) throws JMSException {
        QueueSubscriber<String> subscriber = queueSubscriberCache.get(queue);
        if (subscriber == null) {
            subscriber = rqFactory.<String> createQueueSubscriber(queue, String.class);
            queueSubscriberCache.put(queue, subscriber);
        }
        // 由于activemq端异常导致session失效，需要重新建立session
        if (subscriber.checkSessionClosed()) {
            subscriber = rqFactory.<String> createQueueSubscriber(queue, String.class);
            queueSubscriberCache.put(queue, subscriber);
        }
        return subscriber;
    }
    
    public QueueSubscriber<Object> getCachedObjectSubscriberOnQueue(String queue, boolean acknowledge) throws JMSException {
        QueueSubscriber<Object> subscriber = queueObjectSubscriberCache.get(queue);
        //由于activemq端异常导致session失效，需要重新建立session
        if (subscriber == null || subscriber.checkSessionClosed()) {
            subscriber = rqFactory.<Object> createQueueSubscriber(queue, Object.class, acknowledge);
            queueObjectSubscriberCache.put(queue, subscriber);
        }
        return subscriber;
    }

    public void sendStringOnQueue(String queue, String msg) throws JMSException {
        QueueSubscriber<String> subscriber = getCachedSubscriberOnQueue(queue);
        subscriber.onNext(msg);
    }

    public void sendObjectOnQueue(String queue, Object object) throws JMSException {
        QueueSubscriber<String> subscriber = getCachedSubscriberOnQueue(queue);
        subscriber.onNext(JsonSerialUtil.toJson(object));
    }

    public void sendObjectOnQueue(String queue, Object object, boolean acknowledge) throws JMSException {
        QueueSubscriber<Object> subscriber = getCachedObjectSubscriberOnQueue(queue, acknowledge);
        subscriber.onNext(object);
    }

}
